package com.zhao.apitest.source;

import com.zhao.apitest.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.HashMap;
import java.util.Random;

/**
 * @author ZhaoPan
 * @date 2022/3/9
 * @describe 自定义数据源
 */
public class SourceTest4_UDF {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //从文件读取数据
        DataStream<SensorReading> dataStream = env.addSource(new MySensorSource());

        dataStream.print();

        env.execute();
    }

    // 实现自定义数据源
    public static class MySensorSource implements SourceFunction<SensorReading>{
        // 定义一个标记位,控制数据产生
        private boolean running = true;

        @Override
        public void run(SourceContext<SensorReading> ctv) throws Exception {
            // 随机数
            Random random=new Random();

            //设置10个初始温度
            HashMap<String, Double> sensorTempMap = new HashMap<>();
            for (int i = 0; i < 10; i++) {
                sensorTempMap.put("sensor_"+(i+1), 60 + random.nextGaussian() * 20); // 正态分布
            }
            while (running){
                for (String sensorId: sensorTempMap.keySet()) {
                    Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
                    sensorTempMap.put(sensorId,newTemp);
                    ctv.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));
                }
                Thread.sleep(1000);
            }
        }

        @Override
        public void cancel() {
            running=false;
        }
    }
}
